// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements.  See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership.  The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License.  You may obtain a copy of the License at
//
//   http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied.  See the License for the
// specific language governing permissions and limitations
// under the License.

package org.apache.doris.catalog;

import org.apache.doris.analysis.PartitionKeyDesc;
import org.apache.doris.catalog.OlapTableFactory.MTMVParams;
import org.apache.doris.common.AnalysisException;
import org.apache.doris.common.Config;
import org.apache.doris.common.util.PropertyAnalyzer;
import org.apache.doris.datasource.CatalogMgr;
import org.apache.doris.info.TableNameInfo;
import org.apache.doris.job.common.TaskStatus;
import org.apache.doris.job.extensions.mtmv.MTMVTask;
import org.apache.doris.mtmv.BaseTableInfo;
import org.apache.doris.mtmv.EnvInfo;
import org.apache.doris.mtmv.MTMVCache;
import org.apache.doris.mtmv.MTMVJobInfo;
import org.apache.doris.mtmv.MTMVJobManager;
import org.apache.doris.mtmv.MTMVPartitionInfo;
import org.apache.doris.mtmv.MTMVPartitionInfo.MTMVPartitionType;
import org.apache.doris.mtmv.MTMVPartitionUtil;
import org.apache.doris.mtmv.MTMVPlanUtil;
import org.apache.doris.mtmv.MTMVRefreshEnum.MTMVRefreshState;
import org.apache.doris.mtmv.MTMVRefreshEnum.MTMVState;
import org.apache.doris.mtmv.MTMVRefreshInfo;
import org.apache.doris.mtmv.MTMVRefreshPartitionSnapshot;
import org.apache.doris.mtmv.MTMVRefreshSnapshot;
import org.apache.doris.mtmv.MTMVRelatedTableIf;
import org.apache.doris.mtmv.MTMVRelation;
import org.apache.doris.mtmv.MTMVSnapshotIf;
import org.apache.doris.mtmv.MTMVStatus;
import org.apache.doris.nereids.rules.analysis.SessionVarGuardRewriter;
import org.apache.doris.qe.ConnectContext;

import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
import com.google.gson.annotations.SerializedName;
import org.apache.commons.collections4.MapUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

import java.io.IOException;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.locks.ReentrantReadWriteLock;


public class MTMV extends OlapTable {
    private static final Logger LOG = LogManager.getLogger(MTMV.class);
    private ReentrantReadWriteLock mvRwLock;

    @SerializedName("ri")
    private MTMVRefreshInfo refreshInfo;
    @SerializedName("qs")
    private String querySql;
    @SerializedName("s")
    private MTMVStatus status;
    @Deprecated
    @SerializedName("ei")
    private EnvInfo envInfo;
    @SerializedName("ji")
    private MTMVJobInfo jobInfo;
    @SerializedName("mp")
    private Map<String, String> mvProperties;
    @SerializedName("r")
    private MTMVRelation relation;
    @SerializedName("mpi")
    private MTMVPartitionInfo mvPartitionInfo;
    @SerializedName("rs")
    private MTMVRefreshSnapshot refreshSnapshot;
    // Should update after every fresh, not persist
    // Cache with SessionVarGuardExpr: used when query session variables differ from MV creation variables
    private MTMVCache cacheWithGuard;
    // Cache without SessionVarGuardExpr: used when query session variables match MV creation variables
    private MTMVCache cacheWithoutGuard;
    private long schemaChangeVersion;
    @SerializedName(value = "sv")
    private Map<String, String> sessionVariables;

    // For deserialization
    public MTMV() {
        type = TableType.MATERIALIZED_VIEW;
        mvRwLock = new ReentrantReadWriteLock(true);
    }

    MTMV(MTMVParams params) {
        super(
                params.tableId,
                params.tableName,
                params.schema,
                params.keysType,
                params.partitionInfo,
                params.distributionInfo
        );
        this.type = TableType.MATERIALIZED_VIEW;
        this.querySql = params.querySql;
        this.refreshInfo = params.refreshInfo;
        this.status = new MTMVStatus();
        this.jobInfo = new MTMVJobInfo(MTMVJobManager.MTMV_JOB_PREFIX + params.tableId);
        this.mvProperties = params.mvProperties;
        this.mvPartitionInfo = params.mvPartitionInfo;
        this.relation = params.relation;
        this.refreshSnapshot = new MTMVRefreshSnapshot();
        this.envInfo = new EnvInfo(-1L, -1L);
        this.sessionVariables = params.sessionVariables;
        mvRwLock = new ReentrantReadWriteLock(true);
    }

    @Override
    public boolean needReadLockWhenPlan() {
        return true;
    }

    public MTMVRefreshInfo getRefreshInfo() {
        readMvLock();
        try {
            return refreshInfo;
        } finally {
            readMvUnlock();
        }
    }

    public String getQuerySql() {
        return querySql;
    }

    public MTMVStatus getStatus() {
        readMvLock();
        try {
            return status;
        } finally {
            readMvUnlock();
        }
    }

    public EnvInfo getEnvInfo() {
        return envInfo;
    }

    public MTMVJobInfo getJobInfo() {
        readMvLock();
        try {
            return jobInfo;
        } finally {
            readMvUnlock();
        }
    }

    public MTMVRelation getRelation() {
        readMvLock();
        try {
            return relation;
        } finally {
            readMvUnlock();
        }
    }

    public MTMVRefreshInfo alterRefreshInfo(MTMVRefreshInfo newRefreshInfo) {
        writeMvLock();
        try {
            return refreshInfo.updateNotNull(newRefreshInfo);
        } finally {
            writeMvUnlock();
        }
    }

    public MTMVStatus alterStatus(MTMVStatus newStatus) {
        writeMvLock();
        try {
            // only can update state, refresh state will be change by add task
            this.schemaChangeVersion++;
            this.refreshSnapshot = new MTMVRefreshSnapshot();
            return this.status.updateStateAndDetail(newStatus);
        } finally {
            writeMvUnlock();
        }
    }

    public void processBaseViewChange(String schemaChangeDetail) {
        writeMvLock();
        try {
            this.schemaChangeVersion++;
            this.status.setState(MTMVState.SCHEMA_CHANGE);
            this.status.setSchemaChangeDetail(schemaChangeDetail);
            this.refreshSnapshot = new MTMVRefreshSnapshot();
        } finally {
            writeMvUnlock();
        }
    }

    public boolean addTaskResult(MTMVTask task, MTMVRelation relation,
            Map<String, MTMVRefreshPartitionSnapshot> partitionSnapshots, boolean isReplay) {
        MTMVCache mtmvCacheWithGuard = null;
        MTMVCache mtmvCacheWithoutGuard = null;
        boolean needUpdateCache = false;
        if (task.getStatus() == TaskStatus.SUCCESS && !Env.isCheckpointThread()
                && !Config.enable_check_compatibility_mode) {
            needUpdateCache = true;
            try {
                // The replay thread may not have initialized the catalog yet to avoid getting stuck due
                // to connection issues such as S3, so it is directly set to null
                if (!isReplay) {
                    ConnectContext currentContext = ConnectContext.get();
                    // shouldn't do this while holding mvWriteLock
                    // TODO: these two cache compute share something same, can be simplified in future
                    mtmvCacheWithGuard = MTMVCache.from(this.getQuerySql(),
                            MTMVPlanUtil.createMTMVContext(this, MTMVPlanUtil.DISABLE_RULES_WHEN_GENERATE_MTMV_CACHE),
                            true, true, currentContext, true);
                    mtmvCacheWithoutGuard = MTMVCache.from(this.getQuerySql(),
                            MTMVPlanUtil.createMTMVContext(this, MTMVPlanUtil.DISABLE_RULES_WHEN_GENERATE_MTMV_CACHE),
                            true, true, currentContext, false);
                }
            } catch (Throwable e) {
                mtmvCacheWithGuard = null;
                mtmvCacheWithoutGuard = null;
                LOG.warn("generate cache failed", e);
            }
        }
        writeMvLock();
        try {
            if (!isReplay && task.getMtmvSchemaChangeVersion() != this.schemaChangeVersion) {
                LOG.warn(
                        "addTaskResult failed, schemaChangeVersion has changed. "
                                + "mvName: {}, taskId: {}, taskSchemaChangeVersion: {}, "
                                + "mvSchemaChangeVersion: {}",
                        name, task.getTaskId(), task.getMtmvSchemaChangeVersion(), this.schemaChangeVersion);
                return false;
            }
            if (task.getStatus() == TaskStatus.SUCCESS) {
                this.status.setState(MTMVState.NORMAL);
                this.status.setSchemaChangeDetail(null);
                this.status.setRefreshState(MTMVRefreshState.SUCCESS);
                this.relation = relation;
                if (needUpdateCache) {
                    // Initialize cacheWithGuard, cacheWithoutGuard will be lazily generated when needed
                    this.cacheWithGuard = mtmvCacheWithGuard;
                    // Clear the other cache to ensure consistency
                    this.cacheWithoutGuard = mtmvCacheWithoutGuard;
                }
            } else {
                this.status.setRefreshState(MTMVRefreshState.FAIL);
            }
            this.jobInfo.addHistoryTask(task);
            compatiblePctSnapshot(partitionSnapshots);
            this.refreshSnapshot.updateSnapshots(partitionSnapshots, getPartitionNames());
            Env.getCurrentEnv().getMtmvService()
                    .refreshComplete(this, relation, task);
            return true;
        } finally {
            writeMvUnlock();
        }
    }

    public Map<String, String> alterMvProperties(Map<String, String> mvProperties) {
        writeMvLock();
        try {
            this.mvProperties.putAll(mvProperties);
            return this.mvProperties;
        } finally {
            writeMvUnlock();
        }
    }

    public long getGracePeriod() {
        readMvLock();
        try {
            if (!StringUtils.isEmpty(mvProperties.get(PropertyAnalyzer.PROPERTIES_GRACE_PERIOD))) {
                return Long.parseLong(mvProperties.get(PropertyAnalyzer.PROPERTIES_GRACE_PERIOD)) * 1000;
            } else {
                return 0L;
            }
        } finally {
            readMvUnlock();
        }
    }

    public Optional<String> getWorkloadGroup() {
        readMvLock();
        try {
            if (mvProperties.containsKey(PropertyAnalyzer.PROPERTIES_WORKLOAD_GROUP) && !StringUtils
                    .isEmpty(mvProperties.get(PropertyAnalyzer.PROPERTIES_WORKLOAD_GROUP))) {
                return Optional.of(mvProperties.get(PropertyAnalyzer.PROPERTIES_WORKLOAD_GROUP));
            }
            return Optional.empty();
        } finally {
            readMvUnlock();
        }
    }

    public boolean isUseForRewrite() {
        readMvLock();
        try {
            if (!StringUtils.isEmpty(mvProperties.get(PropertyAnalyzer.PROPERTIES_USE_FOR_REWRITE))) {
                return Boolean.valueOf(mvProperties.get(PropertyAnalyzer.PROPERTIES_USE_FOR_REWRITE));
            }
            // default is true
            return true;
        } finally {
            readMvUnlock();
        }
    }

    public int getRefreshPartitionNum() {
        readMvLock();
        try {
            if (!StringUtils.isEmpty(mvProperties.get(PropertyAnalyzer.PROPERTIES_REFRESH_PARTITION_NUM))) {
                int value = Integer.parseInt(mvProperties.get(PropertyAnalyzer.PROPERTIES_REFRESH_PARTITION_NUM));
                return value < 1 ? MTMVTask.DEFAULT_REFRESH_PARTITION_NUM : value;
            } else {
                return MTMVTask.DEFAULT_REFRESH_PARTITION_NUM;
            }
        } finally {
            readMvUnlock();
        }
    }

    public Set<TableNameInfo> getExcludedTriggerTables() {
        Set<TableNameInfo> res = Sets.newHashSet();
        readMvLock();
        try {
            if (StringUtils.isEmpty(mvProperties.get(PropertyAnalyzer.PROPERTIES_EXCLUDED_TRIGGER_TABLES))) {
                return res;
            }
            String[] split = mvProperties.get(PropertyAnalyzer.PROPERTIES_EXCLUDED_TRIGGER_TABLES).split(",");
            for (String alias : split) {
                res.add(new TableNameInfo(alias));
            }
            return res;
        } finally {
            readMvUnlock();
        }
    }

    public Set<TableNameInfo> getQueryRewriteConsistencyRelaxedTables() {
        Set<TableNameInfo> res = Sets.newHashSet();
        readMvLock();
        try {
            String stillRewrittenTables
                    = mvProperties.get(PropertyAnalyzer.ASYNC_MV_QUERY_REWRITE_CONSISTENCY_RELAXED_TABLES);
            if (StringUtils.isEmpty(stillRewrittenTables)) {
                return res;
            }
            String[] split = stillRewrittenTables.split(",");
            for (String alias : split) {
                res.add(new TableNameInfo(alias));
            }
            return res;
        } finally {
            readMvUnlock();
        }
    }

    /**
     * Called when in query, Should use one connection context in query
     */
    public MTMVCache getOrGenerateCache(ConnectContext connectionContext) throws
            org.apache.doris.nereids.exceptions.AnalysisException {
        // store two MTMVCaches: one is a cache where SessionVariables differ from those at creation time,
        // and the MTMV plan includes a guardexpr;
        // the other is a cache where SessionVariables are the same as at creation time, and the MTMV plan
        // does not include a guardexpr;
        // This way, when sessionVariables are the same, rewriting is possible;
        // When sessionVariables are different, there are two cases:
        // 1. If a guardexpr is present, rewriting is not possible;
        // 2. If no guardexpr is present, rewriting is possible.
        // Determine if current session variables match MV creation session variables
        Map<String, String> currentSessionVars =
                connectionContext.getSessionVariable().getAffectQueryResultInPlanVariables();
        boolean sessionVarsMatch = SessionVarGuardRewriter.checkSessionVariablesMatch(
                currentSessionVars, this.sessionVariables);

        // Select appropriate cache based on session variable match
        readMvLock();
        try {
            if (sessionVarsMatch && cacheWithoutGuard != null) {
                return cacheWithoutGuard;
            }
            if (!sessionVarsMatch && cacheWithGuard != null) {
                return cacheWithGuard;
            }
        } finally {
            readMvUnlock();
        }

        // Generate cache if not exists
        // Concurrent situations may result in duplicate cache generation,
        // but we tolerate this in order to prevent nested use of readLock and write MvLock for the table
        MTMVCache mtmvCache = MTMVCache.from(this.getQuerySql(),
                MTMVPlanUtil.createMTMVContext(this, MTMVPlanUtil.DISABLE_RULES_WHEN_GENERATE_MTMV_CACHE),
                true, false, connectionContext, !sessionVarsMatch);
        writeMvLock();
        try {
            if (sessionVarsMatch) {
                this.cacheWithoutGuard = mtmvCache;
            } else {
                this.cacheWithGuard = mtmvCache;
            }
            return mtmvCache;
        } finally {
            writeMvUnlock();
        }
    }

    public Map<String, String> getMvProperties() {
        readMvLock();
        try {
            return mvProperties;
        } finally {
            readMvUnlock();
        }
    }

    public MTMVPartitionInfo getMvPartitionInfo() {
        return mvPartitionInfo;
    }

    public MTMVRefreshSnapshot getRefreshSnapshot() {
        return refreshSnapshot;
    }

    public long getSchemaChangeVersion() {
        readMvLock();
        try {
            return schemaChangeVersion;
        } finally {
            readMvUnlock();
        }
    }

    /**
     * generateMvPartitionDescs
     *
     * @return mvPartitionName ==> mvPartitionKeyDesc
     */
    public Map<String, PartitionKeyDesc> generateMvPartitionDescs() {
        Map<String, PartitionItem> mtmvItems = getAndCopyPartitionItems();
        Map<String, PartitionKeyDesc> result = Maps.newHashMap();
        for (Entry<String, PartitionItem> entry : mtmvItems.entrySet()) {
            result.put(entry.getKey(), entry.getValue().toPartitionKeyDesc());
        }
        return result;
    }

    /**
     * Calculate the partition and associated partition mapping relationship of the MTMV
     * It is the result of real-time comparison calculation, so there may be some costs,
     * so it should be called with caution
     *
     * @return mvPartitionName ==> pctTable ==> pctPartitionName
     * @throws AnalysisException
     */
    public Map<String, Map<MTMVRelatedTableIf, Set<String>>> calculatePartitionMappings() throws AnalysisException {
        if (mvPartitionInfo.getPartitionType() == MTMVPartitionType.SELF_MANAGE) {
            return Maps.newHashMap();
        }
        long start = System.currentTimeMillis();
        Map<String, Map<MTMVRelatedTableIf, Set<String>>> res = Maps.newHashMap();
        Map<PartitionKeyDesc, Map<MTMVRelatedTableIf, Set<String>>> pctPartitionDescs = MTMVPartitionUtil
                .generateRelatedPartitionDescs(mvPartitionInfo, mvProperties, getPartitionColumns());
        Map<String, PartitionItem> mvPartitionItems = getAndCopyPartitionItems();
        for (Entry<String, PartitionItem> entry : mvPartitionItems.entrySet()) {
            res.put(entry.getKey(),
                    pctPartitionDescs.getOrDefault(entry.getValue().toPartitionKeyDesc(), Maps.newHashMap()));
        }
        if (LOG.isDebugEnabled()) {
            LOG.debug("calculatePartitionMappings use [{}] mills, mvName is [{}]",
                    System.currentTimeMillis() - start, name);
        }
        return res;
    }

    public ConcurrentLinkedQueue<MTMVTask> getHistoryTasks() {
        return jobInfo.getHistoryTasks();
    }

    // for test
    public void setRefreshInfo(MTMVRefreshInfo refreshInfo) {
        this.refreshInfo = refreshInfo;
    }

    // for test
    public void setQuerySql(String querySql) {
        this.querySql = querySql;
    }

    // for test
    public void setStatus(MTMVStatus status) {
        this.status = status;
    }

    // for test
    public void setJobInfo(MTMVJobInfo jobInfo) {
        this.jobInfo = jobInfo;
    }

    // for test
    public void setMvProperties(Map<String, String> mvProperties) {
        this.mvProperties = mvProperties;
    }

    // for test
    public void setRelation(MTMVRelation relation) {
        this.relation = relation;
    }

    // for test
    public void setMvPartitionInfo(MTMVPartitionInfo mvPartitionInfo) {
        this.mvPartitionInfo = mvPartitionInfo;
    }

    // for test
    public void setRefreshSnapshot(MTMVRefreshSnapshot refreshSnapshot) {
        this.refreshSnapshot = refreshSnapshot;
    }

    public boolean canBeCandidate() {
        return getStatus().canBeCandidate();
    }

    public void readMvLock() {
        this.mvRwLock.readLock().lock();
    }

    public void readMvUnlock() {
        this.mvRwLock.readLock().unlock();
    }

    public void writeMvLock() {
        this.mvRwLock.writeLock().lock();
    }

    public void writeMvUnlock() {
        this.mvRwLock.writeLock().unlock();
    }

    // toString() is not easy to find where to call the method
    public String toInfoString() {
        final StringBuilder sb = new StringBuilder("MTMV{");
        sb.append("refreshInfo=").append(refreshInfo);
        sb.append(", querySql='").append(querySql).append('\'');
        sb.append(", status=").append(status);
        if (jobInfo != null) {
            sb.append(", jobInfo=").append(jobInfo.toInfoString());
        }
        sb.append(", mvProperties=").append(mvProperties);
        if (relation != null) {
            sb.append(", relation=").append(relation.toInfoString());
        }
        if (mvPartitionInfo != null) {
            sb.append(", mvPartitionInfo=").append(mvPartitionInfo.toInfoString());
        }
        sb.append(", refreshSnapshot=").append(refreshSnapshot);
        sb.append(", id=").append(id);
        sb.append(", name='").append(name).append('\'');
        sb.append(", qualifiedDbName='").append(qualifiedDbName).append('\'');
        sb.append(", comment='").append(comment).append('\'');
        sb.append('}');
        return sb.toString();
    }

    /**
     * Previously, ID was used to store the related table of materialized views,
     * but when the catalog is deleted, the ID will change, so name is used instead.
     * The logic here is to be compatible with older versions by converting ID to name
     */
    public void compatible(CatalogMgr catalogMgr) {
        try {
            compatibleInternal(catalogMgr);
            Env.getCurrentEnv().getMtmvService().unregisterMTMV(this);
            Env.getCurrentEnv().getMtmvService().registerMTMV(this, this.getDatabase().getId());
        } catch (Throwable e) {
            LOG.warn("MTMV compatible failed, dbName: {}, mvName: {}, errMsg: {}", getDBName(), name, e.getMessage());
            status.setState(MTMVState.SCHEMA_CHANGE);
            status.setSchemaChangeDetail("compatible failed, please refresh or recreate it, reason: " + e.getMessage());
        }
    }

    private void compatibleInternal(CatalogMgr catalogMgr) throws Exception {
        if (mvPartitionInfo != null) {
            mvPartitionInfo.compatible(catalogMgr);
        }
        if (relation != null) {
            relation.compatible(catalogMgr);
        }
        if (refreshSnapshot != null) {
            refreshSnapshot.compatible(this);
        }
    }

    @Override
    public void gsonPostProcess() throws IOException {
        super.gsonPostProcess();
        Map<String, MTMVRefreshPartitionSnapshot> partitionSnapshots = refreshSnapshot.getPartitionSnapshots();
        compatiblePctSnapshot(partitionSnapshots);
    }

    private void compatiblePctSnapshot(Map<String, MTMVRefreshPartitionSnapshot> partitionSnapshots) {
        BaseTableInfo relatedTableInfo = mvPartitionInfo.getRelatedTableInfo();
        if (relatedTableInfo == null) {
            return;
        }
        if (MapUtils.isEmpty(partitionSnapshots)) {
            return;
        }
        for (MTMVRefreshPartitionSnapshot partitionSnapshot : partitionSnapshots.values()) {
            Map<String, MTMVSnapshotIf> partitions = partitionSnapshot.getPartitions();
            Map<BaseTableInfo, Map<String, MTMVSnapshotIf>> pcts = partitionSnapshot.getPcts();
            if (!MapUtils.isEmpty(partitions) && MapUtils.isEmpty(pcts)) {
                pcts.put(relatedTableInfo, partitions);
            }
        }
    }

    public Map<String, String> getSessionVariables() {
        return sessionVariables;
    }
}
